-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(source): Avro with AWS Glue Schema Registry #17605
Conversation
b1d0c39
to
b67cf20
Compare
832c4e5
to
a4797fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM! The shape is good. Didn't check business logic carefully.
if format_encode_options_to_consume | ||
.remove(AWS_GLUE_SCHEMA_ARN_KEY) | ||
.is_none() | ||
{ | ||
// Legacy logic that assumes either `schema.location` or confluent `schema.registry`. | ||
// The handling of newly added aws glue is centralized in `connector::parser`. | ||
// TODO(xiangjinwu): move these option parsing to `connector::parser` as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we need to add logic to check AWS_GLUE_SCHEMA_ARN_KEY
is not specified together with other 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tested and when they are specified together:
NOTICE: Get unknown format_encode_options for Plain Avro: schema.location
So it is already covered by the unknown options check, just not fatal. After the config parsing refactor #17675 we can try to make unknown options a fatal error.
pub trait GlueSchemaCache { | ||
/// Gets the a specific schema by id, which is used as *writer schema*. | ||
async fn get_by_id(&self, schema_version_id: uuid::Uuid) -> ConnectorResult<Arc<Schema>>; | ||
/// Gets the latest schema by arn, which is used as *reader schema*. | ||
async fn get_by_name(&self, schema_arn: &str) -> ConnectorResult<Arc<Schema>>; | ||
} | ||
|
||
#[derive(Debug)] | ||
pub enum GlueSchemaCacheImpl { | ||
Real(RealGlueSchemaCache), | ||
Mock(MockGlueSchemaCache), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a good style to me: the trait seems a little unnecessary to me, but anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary as a tool (generic) but it is a doc to make sure RealGlueSchemaCache
and MockGlueSchemaCache
have the same interface.
The unnecessary part is the enum, as we can simply Box<dyn GlueSchemaCache>
. This aligns more with mocked interfaces in Java or Go, but in rust we always prefer static dispatch with enum rather than dynamic dispatch (?)
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Before this PR, there is a boolean
use_schema_registry
that differentiates usage ofschema.location
vs (confluent)schema.registry
.In this PR, we introduce AWS Glue schema registry with
aws.glue.schema_arn
option, and the boolean is deprecated in favor ofenum
s.hide whitespace
becauseif-else
are updated tomatch
.glue.slt
and majority ofglue_resolver.rs
are for tests.catalog.proto
,StreamSourceInfo
still hasuse_schema_registry
for backward compatibility.src/frontend/src/handler/create_source.rs
andsrc/connector/src/parser/mod.rs
, the boolean has been replaced byenum SchemaLocation
.src/connector/src/parser/avro/parser.rs
, theOption<Arc<ConfluentSchemaCache>>
has been replaced byenum WriterSchemaCache
.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Added support for AWS Glue Schema Registry when using
encode avro
for sources:When using IAM permission policies, the following
action
s shall be allowed:glue:GetSchemaVersion